package com.flow.framework.base.system.checker.impl;

import com.flow.framework.common.health.ServiceHealthCheckCode;
import com.flow.framework.common.util.verify.VerifyUtil;
import com.flow.framework.core.system.checker.AbstractSystemStatusChecker;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.composite.CompositeDiscoveryClient;
import org.springframework.core.env.Environment;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
 * 基于注册中心调用的RPC连接健康检查
 *
 * @author luoguopiao
 * @version 0.0.1
 * @date 2022/12/17
 */
@Slf4j
public class DiscoveryRemoteStatusChecker extends AbstractSystemStatusChecker {

    private CompositeDiscoveryClient discoveryClient;

    private Environment environment;

    private Set<String> serviceNamesOrPlaceHolders = new HashSet<>();

    public DiscoveryRemoteStatusChecker(CompositeDiscoveryClient discoveryClient, Environment environment,
                                        Set<String> serviceNamesOrPlaceHolders) {
        this.discoveryClient = discoveryClient;
        this.environment = environment;
        if (!VerifyUtil.isEmpty(serviceNamesOrPlaceHolders)) {
            this.serviceNamesOrPlaceHolders.addAll(serviceNamesOrPlaceHolders);
        }
    }

    /**
     * @inheritDoc
     */
    @Override
    public int getServiceHealthCheckCode() {
        return ServiceHealthCheckCode.SERVICE_INNER_NET_CODE;
    }

    /**
     * @inheritDoc
     */
    @Override
    protected Set<String> executeAsyncHealthCheck() {
        if (VerifyUtil.isEmpty(serviceNamesOrPlaceHolders)) {
            return Collections.emptySet();
        }
        Set<String> unhealthyTags = new HashSet<>();
        for (String serviceNamesOrPlaceHolder : serviceNamesOrPlaceHolders) {
            String serviceName = environment.resolvePlaceholders(serviceNamesOrPlaceHolder);
            List<ServiceInstance> instances = discoveryClient.getInstances(serviceName);
            if (VerifyUtil.isEmpty(instances)) {
                log.error("check rpc error, service name: {}", serviceName);
                unhealthyTags.add(serviceName);
                continue;
            }
            for (ServiceInstance serviceInstance : instances) {
                String host = serviceInstance.getHost();
                int port = serviceInstance.getPort();
                Socket socket = new Socket();
                try {
                    socket.connect(new InetSocketAddress(host, port), 1000);
                } catch (IOException e) {
                    log.error("check rpc error, service name: {}", serviceName);
                    unhealthyTags.add(serviceName + ":" + host + ":" + port);
                } finally {
                    try {
                        socket.close();
                    } catch (IOException ignore) {

                    }
                }
            }
        }
        return unhealthyTags;
    }

    /**
     * @inheritDoc
     */
    @Override
    public long getInitialDelay() {
        return 0;
    }

    /**
     * @inheritDoc
     */
    @Override
    public long getPeriod() {
        return 30000;
    }
}
